.NET: fix: HandoffAgentExecutor does not output any response when non-streaming#4745
.NET: fix: HandoffAgentExecutor does not output any response when non-streaming#4745
Conversation
There was a problem hiding this comment.
Pull request overview
Updates the specialized HandoffAgentExecutor to reliably produce workflow output in non-streaming mode (addressing the gap that existed compared to AIAgentHostExecutor improvements), and adds unit coverage to prevent regressions.
Changes:
- Emit a final
AgentResponseoutput fromHandoffAgentExecutorwhenTurnToken.EmitEventsis nottrue(i.e., non-streaming). - Refactor existing
AIAgentHostExecutorunit tests to share common assertions via a new base class. - Add a new unit test validating
HandoffAgentExecutoroutput behavior acrossTurnToken.EmitEventsvalues.
Reviewed changes
Copilot reviewed 2 out of 2 changed files in this pull request and generated no comments.
| File | Description |
|---|---|
| dotnet/src/Microsoft.Agents.AI.Workflows/Specialized/HandoffAgentExecutor.cs | Emits a final AgentResponse output in non-streaming mode while preserving streaming-update behavior when enabled. |
| dotnet/tests/Microsoft.Agents.AI.Workflows.UnitTests/AIAgentHostExecutorTests.cs | Extracts shared verification helpers into a base class and adds coverage for HandoffAgentExecutor output type selection. |
You can also share your feedback on Copilot code review. Take the survey.
There was a problem hiding this comment.
Pull request overview
This PR addresses missing agent output in non-streaming Handoff workflows when hosted as an AIAgent, ensuring a final AgentResponse is emitted (vs. only streaming updates), and adds coverage for history persistence.
Changes:
- Update
HandoffAgentExecutorto emit anAgentResponsewhenTurnToken.EmitEventsis nottrue(treating the flag as “streaming vs. non-streaming” output selection). - Persist merged outgoing messages into the workflow session chat history for both streaming and non-streaming agent runs.
- Refactor/extend unit tests to cover the expected output type and that outgoing messages are stored in history.
Reviewed changes
Copilot reviewed 5 out of 5 changed files in this pull request and generated 5 comments.
Show a summary per file
| File | Description |
|---|---|
| dotnet/tests/Microsoft.Agents.AI.Workflows.UnitTests/WorkflowHostSmokeTests.cs | Adds smoke coverage that outgoing messages are persisted to workflow session history; updates test inheritance. |
| dotnet/tests/Microsoft.Agents.AI.Workflows.UnitTests/AIAgentHostExecutorTests.cs | Refactors shared assertions into a base class and adds targeted HandoffAgentExecutor output-type tests. |
| dotnet/src/Microsoft.Agents.AI.Workflows/WorkflowHostAgent.cs | Persists merged responses into workflow session chat history after running (streaming + non-streaming). |
| dotnet/src/Microsoft.Agents.AI.Workflows/WorkflowChatHistoryProvider.cs | Adds an accessor to retrieve all messages from session state (used by tests). |
| dotnet/src/Microsoft.Agents.AI.Workflows/Specialized/HandoffAgentExecutor.cs | Emits a final AgentResponse in non-streaming mode while continuing to stream updates when configured. |
You can also share your feedback on Copilot code review. Take the survey.
dotnet/tests/Microsoft.Agents.AI.Workflows.UnitTests/WorkflowHostSmokeTests.cs
Show resolved
Hide resolved
dotnet/tests/Microsoft.Agents.AI.Workflows.UnitTests/WorkflowHostSmokeTests.cs
Outdated
Show resolved
Hide resolved
45f0d3e to
f11fbf0
Compare
f11fbf0 to
9665b2d
Compare
9665b2d to
68976a1
Compare
chetantoshniwal
left a comment
There was a problem hiding this comment.
Automated Code Review
Reviewers: 4 | Confidence: 86%
✗ Correctness
This PR refactors chat history management by moving
AddMessagesandUpdateBookmarkcalls from per-update (insideWorkflowSession.CreateUpdateandInvokeStageAsync's finally block) to post-completion (inWorkflowHostAgent'sRunCoreAsync/RunCoreStreamingAsync). It also adds a new output mode forHandoffAgentExecutorwhere the fullAgentResponseis yielded when streaming updates are not requested. The changes are largely sound, but the streaming path inRunCoreStreamingAsynclacks a try/finally guard, meaning chat history and bookmark updates are silently skipped if the consumer disposes the async enumerator early or cancellation occurs—a behavioral regression from the oldfinallyblock inInvokeStageAsync.
✗ Security Reliability
The diff moves chat-history persistence (AddMessages + UpdateBookmark) out of WorkflowSession.InvokeStageAsync (which had a try/finally) into the callers in WorkflowHostAgent. The non-streaming path (RunCoreAsync) is fine because the method is not an async iterator and fully consumes the enumerable. However, RunCoreStreamingAsync is an async iterator (it uses yield return), and the post-loop AddMessages/UpdateBookmark calls are placed after the await foreach loop without a try/finally. In C#, when a consumer disposes an async enumerator early (break, cancellation, exception), code after the yield-return loop is skipped—only pending finally blocks execute. This means on early termination the response messages are never written to history and the bookmark is never advanced, leaving the session in an inconsistent state. This is a reliability regression from the old code whose finally block guaranteed bookmark updates on any exit path.
✗ Test Coverage
The diff moves chat-history bookkeeping out of WorkflowSession.CreateUpdate/InvokeStageAsync and into WorkflowHostAgent.RunCoreAsync and the new RunCoreStreamingAsync path. Two new smoke tests verify that outgoing messages appear in the chat history after RunAsync (non-streaming), and a new HandoffAgentExecutor test covers the emit-events logic. However, there is no corresponding streaming-path test for the chat-history/bookmark change in RunCoreStreamingAsync, which is the most significant behavioural addition in this PR. The removal of the try/finally around UpdateBookmark also changes error-path behaviour that is untested.
✗ Design Approach
The diff moves chat history management from incremental per-update writes (inside
WorkflowSession.CreateUpdate) to a deferred batch write after the full turn completes (inWorkflowHostAgent). This is a cleaner design in principle, but the streaming path has a correctness gap: history and bookmark updates only run after theawait foreachloop, with notry/finallyguard. If a consumer disposes the streamingIAsyncEnumerableearly, those commits never execute—yetLastCheckpointcan be advanced mid-stream (viaSuperStepCompletedEvent). This creates a divergence between workflow checkpoint state (advanced) and chat history bookmark (stale), potentially causing re-delivery of already-processed messages on the next turn. The oldfinallyblock inInvokeStageAsyncexisted precisely to prevent this. Separately,HandoffAgentExecutorrepurposesTurnToken.EmitEvents—documented as controlling whether agent run events are raised—to also mean 'emit a batched full response instead of streaming updates'. The inline comment acknowledges this is a workaround due to a configuration limitation, which is itself a flag for a deeper design issue rather than a fix.
Flagged Issues
- In
WorkflowHostAgent.RunCoreStreamingAsync,AddMessagesandUpdateBookmarkare placed after theawait foreachloop with notry/finally. Since this is an async iterator (usesyield return), if the consumer disposes the enumerator early (cancellation,break, or exception), post-loop code is skipped—only pendingfinallyblocks execute. Meanwhile,WorkflowSession.LastCheckpointcan already be advanced mid-stream (bySuperStepCompletedEvent), so skippingUpdateBookmarkleaves a divergence: the workflow has moved past a checkpoint but the history bookmark has not, causing re-delivery of already-consumed messages on the next turn. The oldfinallyblock inInvokeStageAsyncprevented exactly this. Atry/finallyaround the loop is needed to ensure chat history is updated on all exit paths. - The new smoke tests (
Test_SingleAgent_AsAgent_OutgoingMessagesInHistoryAsync,Test_Handoffs_AsAgent_OutgoingMessagesInHistoryAsync) both callRunAsync, exercising only the non-streaming path. The streaming path inRunCoreStreamingAsynccontains identical new logic (MessageMergeraccumulation,AddMessages,UpdateBookmark) but has zero direct test coverage.
Suggestions
- Add streaming-path equivalents of the two new smoke tests that call
RunStreamingAsync, consume all updates, and verify the session's chat history and bookmark are updated afterwards. - The refactor also changes error-path behaviour:
UpdateBookmarkwas previously in afinallyblock insideInvokeStageAsync, guaranteeing it ran even when the enumeration threw. It now sits after theforeachloop. Consider adding a test that verifies the intended behaviour when the inner workflow throws—is it intentional that the bookmark is not updated in that case? HandoffAgentExecutorrepurposesTurnToken.EmitEvents(documented as controlling whether agent run events are raised) to also decide whether to emit a full batchedAgentResponse—a distinct concept. This will break callers that setEmitEvents = falseto silence streaming events but do not expect a batched response injected into the output stream. A dedicated option onHandoffAgentExecutorOptionsthat controls batch-vs-streaming output mode would express the intent without overloading an unrelated flag's semantics.
Automated review by chetantoshniwal's agents
| await this.ValidateWorkflowAsync().ConfigureAwait(false); | ||
|
|
||
| WorkflowSession workflowSession = await this.UpdateSessionAsync(messages, session, cancellationToken).ConfigureAwait(false); | ||
| MessageMerger merger = new(); | ||
|
|
||
| await foreach (AgentResponseUpdate update in workflowSession.InvokeStageAsync(cancellationToken) | ||
| .ConfigureAwait(false) | ||
| .WithCancellation(cancellationToken)) | ||
| { | ||
| merger.AddUpdate(update); | ||
| yield return update; | ||
| } | ||
|
|
||
| AgentResponse response = merger.ComputeMerged(workflowSession.LastResponseId!, this.Id, this.Name); | ||
| workflowSession.ChatHistoryProvider.AddMessages(workflowSession, response.Messages); | ||
| workflowSession.ChatHistoryProvider.UpdateBookmark(workflowSession); |
There was a problem hiding this comment.
RunCoreStreamingAsync is an async iterator. If the caller breaks out of enumeration early (cancellation, break, exception), code after the await foreach loop is skipped—only pending finally blocks execute. This means AddMessages and UpdateBookmark are silently skipped on early termination. Worse, WorkflowSession.LastCheckpoint can already be advanced mid-stream (when SuperStepCompletedEvent fires), so the next call to GetFromBookmark will return messages the workflow has already consumed, causing a checkpoint/bookmark divergence. The old InvokeStageAsync finally block prevented this. Wrap the loop and post-loop code in a try/finally:
| await this.ValidateWorkflowAsync().ConfigureAwait(false); | |
| WorkflowSession workflowSession = await this.UpdateSessionAsync(messages, session, cancellationToken).ConfigureAwait(false); | |
| MessageMerger merger = new(); | |
| await foreach (AgentResponseUpdate update in workflowSession.InvokeStageAsync(cancellationToken) | |
| .ConfigureAwait(false) | |
| .WithCancellation(cancellationToken)) | |
| { | |
| merger.AddUpdate(update); | |
| yield return update; | |
| } | |
| AgentResponse response = merger.ComputeMerged(workflowSession.LastResponseId!, this.Id, this.Name); | |
| workflowSession.ChatHistoryProvider.AddMessages(workflowSession, response.Messages); | |
| workflowSession.ChatHistoryProvider.UpdateBookmark(workflowSession); | |
| WorkflowSession workflowSession = await this.UpdateSessionAsync(messages, session, cancellationToken).ConfigureAwait(false); | |
| MessageMerger merger = new(); | |
| try | |
| { | |
| await foreach (AgentResponseUpdate update in workflowSession.InvokeStageAsync(cancellationToken) | |
| .ConfigureAwait(false) | |
| .WithCancellation(cancellationToken)) | |
| { | |
| merger.AddUpdate(update); | |
| yield return update; | |
| } | |
| } | |
| finally | |
| { | |
| AgentResponse response = merger.ComputeMerged(workflowSession.LastResponseId!, this.Id, this.Name); | |
| workflowSession.ChatHistoryProvider.AddMessages(workflowSession, response.Messages); | |
| workflowSession.ChatHistoryProvider.UpdateBookmark(workflowSession); | |
| } |
| ChatMessage[] sessionMessages = workflowSession.ChatHistoryProvider.GetAllMessages(workflowSession).ToArray(); | ||
|
|
||
| // Since we never sent an incoming message, the expectation is that there should be nothing in the session | ||
| // except the response |
There was a problem hiding this comment.
This test validates the non-streaming path (RunAsync). The streaming path in RunCoreStreamingAsync now has identical AddMessages + UpdateBookmark logic (new in this PR) but no test. Please add a streaming variant that calls RunStreamingAsync, consumes all updates, then asserts GetAllMessages matches the streamed content.
| ChatMessage[] responseMessages = response.Messages.ToArray(); | ||
| ChatMessage[] sessionMessages = workflowSession.ChatHistoryProvider.GetAllMessages(workflowSession).ToArray(); | ||
|
|
||
| // Since we never sent an incoming message, the expectation is that there should be nothing in the session |
There was a problem hiding this comment.
Same gap: the handoff-workflow history test only covers RunAsync. A streaming variant is needed to cover the new RunCoreStreamingAsync bookmark/history logic.
| if (message.TurnToken.EmitEvents is not true) | ||
| { | ||
| await context.YieldOutputAsync(agentResponse, cancellationToken).ConfigureAwait(false); | ||
| } |
There was a problem hiding this comment.
TurnToken.EmitEvents is documented as controlling whether agent run events (streaming updates) are raised. Here it is repurposed to also decide whether to emit a full batched AgentResponse—a distinct concept. This will break callers that set EmitEvents = false to silence streaming events but do not expect a batched response injected into the output stream. A dedicated option on HandoffAgentExecutorOptions that controls batch-vs-streaming output mode would express the intent without overloading an unrelated flag's semantics.
Motivation and Context
Due to needing custom logic and a more delocalized state, the
HandoffAgentExecutoris still separate from the generalAIAgentHostExecutor. The fixes made to theAIAgentHostExecutorto better support both streaming and non-streaming agent invocation modes and separate configuration for outputs (streaming vs. nonstream, output vs. no output) in PR #3142 did not get ported toHandoffAgentExecutorin hopes of unifying the two.Description
To better support Handoff workflows running in non-streaming mode (and in particular when hosted as an AIAgent), we change the
HandoffAgentExecutorto always emit anAgentRunResponseif it is not configured toemitEventson theTurnToken, which should be more properly calledemitStreamingUpdatesnow thatAgentRunResponseEventexists.Fixes #2423
Contribution Checklist
Is this a breaking change? If yes, add "[BREAKING]" prefix to the title of the PR.